/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.xml;



import com.google.auto.value.AutoValue;

import com.bff.gaia.unified.sdk.io.*;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.transforms.display.HasDisplayData;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PDone;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.CompressedSource;

import com.bff.gaia.unified.sdk.io.Compression;

import com.bff.gaia.unified.sdk.io.FileIO;

import com.bff.gaia.unified.sdk.io.FileSystems;

import com.bff.gaia.unified.sdk.io.OffsetBasedSource;

import com.bff.gaia.unified.sdk.io.ReadAllViaFileBasedSource;



import javax.annotation.Nullable;

import javax.xml.bind.JAXBContext;

import javax.xml.bind.JAXBException;

import javax.xml.bind.Marshaller;

import javax.xml.bind.ValidationEventHandler;

import java.io.IOException;

import java.io.OutputStream;

import java.io.Serializable;

import java.nio.channels.Channels;

import java.nio.channels.WritableByteChannel;

import java.nio.charset.Charset;

import java.nio.charset.StandardCharsets;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/** Transforms for reading and writing XML files using JAXB mappers. */

public class XmlIO {

  // CHECKSTYLE.OFF: JavadocStyle

  /**

   * Reads XML files as a {@link PCollection} of a given type mapped via JAXB.

   *

   * <p>The XML files must be of the following form, where {@code root} and {@code record} are XML

   * element names that are defined by the user:

   *

   * <pre>{@code

   * <root>

   * <record> ... </record>

   * <record> ... </record>

   * <record> ... </record>

   * ...

   * <record> ... </record>

   * </root>

   * }</pre>

   *

   * <p>Basically, the XML document should contain a single root element with an inner list

   * consisting entirely of record elements. The records may contain arbitrary XML content; however,

   * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags.

   * This restriction enables reading from large XML files in parallel from different offsets in the

   * file.

   *

   * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.

   * Additionally users must provide a class of a JAXB annotated Java type that can be used convert

   * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms.

   * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type.

   * Optionally users may provide a minimum size of a bundle that should be created for the source.

   *

   * <p>Example:

   *

   * <pre>{@code

   * PCollection<Record> output = p.apply(XmlIO.<Record>read()

   *     .from(file.toPath().toString())

   *     .withRootElement("root")

   *     .withRecordElement("record")

   *     .withRecordClass(Record.class));

   * }</pre>

   *

   * <p>By default, UTF-8 charset is used. To specify a different charset, use {@link

   * Read#withCharset}.

   *

   * <p>Currently, only XML files that use single-byte characters are supported. Using a file that

   * contains multi-byte characters may result in data loss or duplication.

   *

   * @param <T> Type of the objects that represent the records of the XML file. The {@code

   *     PCollection} generated by this source will be of this type.

   */

  // CHECKSTYLE.ON: JavadocStyle

  public static <T> Read<T> read() {

    return new AutoValue_XmlIO_Read.Builder<T>()

        .setConfiguration(

            new AutoValue_XmlIO_MappingConfiguration.Builder<T>()

                .setCharset(StandardCharsets.UTF_8.name())

                .build())

        .setMinBundleSize(1L)

        .setCompression(Compression.AUTO)

        .build();

  }



  /**

   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link FileIO.ReadableFile}, which

   * allows more flexible usage via different configuration options of {@link FileIO#match} and

   * {@link FileIO#readMatches} that are not explicitly provided for {@link #read}.

   *

   * <p>For example:

   *

   * <pre>{@code

   * PCollection<ReadableFile> files = p

   *     .apply(FileIO.match().filepattern(options.getInputFilepatternProvider()).continuously(

   *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardMinutes(5))))

   *     .apply(FileIO.readMatches().withCompression(GZIP));

   *

   * PCollection<Record> output = files.apply(XmlIO.<Record>readFiles()

   *     .withRootElement("root")

   *     .withRecordElement("record")

   *     .withRecordClass(Record.class));

   * }</pre>

   */

  public static <T> ReadFiles<T> readFiles() {

    return new AutoValue_XmlIO_ReadFiles.Builder<T>()

        .setConfiguration(

            new AutoValue_XmlIO_MappingConfiguration.Builder<T>()

                .setCharset(StandardCharsets.UTF_8.name())

                .build())

        .build();

  }



  /**

   * Writes all elements in the input {@link PCollection} to a single XML file using {@link #sink}.

   *

   * <p>For more configurable usage, use {@link #sink} directly with {@link FileIO#write} or {@link

   * FileIO#writeDynamic}.

   */

  public static <T> Write<T> write() {

    return new AutoValue_XmlIO_Write.Builder<T>().setCharset(StandardCharsets.UTF_8.name()).build();

  }



  @AutoValue

  abstract static class MappingConfiguration<T> implements HasDisplayData, Serializable {

    @Nullable

    abstract String getRootElement();



    @Nullable

    abstract String getRecordElement();



    @Nullable

    abstract Class<T> getRecordClass();



    @Nullable

    abstract String getCharset();



    @Nullable

    abstract ValidationEventHandler getValidationEventHandler();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setRootElement(String rootElement);



      abstract Builder<T> setRecordElement(String recordElement);



      abstract Builder<T> setRecordClass(Class<T> recordClass);



      abstract Builder<T> setCharset(String charset);



      abstract Builder<T> setValidationEventHandler(ValidationEventHandler validationEventHandler);



      abstract MappingConfiguration<T> build();

    }



    private MappingConfiguration<T> withRootElement(String rootElement) {

      return toBuilder().setRootElement(rootElement).build();

    }



    private MappingConfiguration<T> withRecordElement(String recordElement) {

      return toBuilder().setRecordElement(recordElement).build();

    }



    private MappingConfiguration<T> withRecordClass(Class<T> recordClass) {

      return toBuilder().setRecordClass(recordClass).build();

    }



    private MappingConfiguration<T> withCharset(Charset charset) {

      return toBuilder().setCharset(charset.name()).build();

    }



    private MappingConfiguration<T> withValidationEventHandler(

        ValidationEventHandler validationEventHandler) {

      return toBuilder().setValidationEventHandler(validationEventHandler).build();

    }



    private void validate() {

      checkArgument(getRootElement() != null, "withRootElement() is required");

      checkArgument(getRecordElement() != null, "withRecordElement() is required");

      checkArgument(getRecordClass() != null, "withRecordClass() is required");

      checkArgument(getCharset() != null, "withCharset() is required");

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      builder

          .addIfNotNull(

              DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element"))

          .addIfNotNull(

              DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element"))

          .addIfNotNull(

              DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"))

          .addIfNotNull(DisplayData.item("charset", getCharset()).withLabel("Charset"));

    }

  }



  /** Implementation of {@link #read}. */

  @AutoValue

  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

    abstract MappingConfiguration<T> getConfiguration();



    @Nullable

    abstract ValueProvider<String> getFileOrPatternSpec();



    abstract Compression getCompression();



    abstract long getMinBundleSize();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setConfiguration(MappingConfiguration<T> configuration);



      abstract Builder<T> setFileOrPatternSpec(ValueProvider<String> fileOrPatternSpec);



      abstract Builder<T> setCompression(Compression compression);



      abstract Builder<T> setMinBundleSize(long minBundleSize);



      abstract Read<T> build();

    }



    /** @deprecated Use {@link Compression} instead. */

    @Deprecated

    public enum CompressionType {

      /** @see Compression#AUTO */

      AUTO(Compression.AUTO),



      /** @see Compression#UNCOMPRESSED */

      UNCOMPRESSED(Compression.UNCOMPRESSED),



      /** @see Compression#GZIP */

      GZIP(Compression.GZIP),



      /** @see Compression#BZIP2 */

      BZIP2(Compression.BZIP2),



      /** @see Compression#ZIP */

      ZIP(Compression.ZIP),



      /** @see Compression#DEFLATE */

      DEFLATE(Compression.DEFLATE);



      private final Compression canonical;



      CompressionType(Compression canonical) {

        this.canonical = canonical;

      }



      /** @see Compression#matches */

      public boolean matches(String filename) {

        return canonical.matches(filename);

      }

    }



    /**

     * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML

     * file should be of the form defined in {@link #read}.

     */

    public Read<T> from(String fileOrPatternSpec) {

      return from(ValueProvider.StaticValueProvider.of(fileOrPatternSpec));

    }



    /**

     * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML

     * file should be of the form defined in {@link #read}. Using ValueProviders.

     */

    public Read<T> from(ValueProvider<String> fileOrPatternSpec) {

      return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();

    }



    private Read<T> withConfiguration(MappingConfiguration<T> configuration) {

      return toBuilder().setConfiguration(configuration).build();

    }



    /**

     * Sets name of the root element of the XML document. This will be used to create a valid

     * starting root element when initiating a bundle of records created from an XML document. This

     * is a required parameter.

     */

    public Read<T> withRootElement(String rootElement) {

      return withConfiguration(getConfiguration().withRootElement(rootElement));

    }



    /**

     * Sets name of the record element of the XML document. This will be used to determine offset of

     * the first record of a bundle created from the XML document. This is a required parameter.

     */

    public Read<T> withRecordElement(String recordElement) {

      return withConfiguration(getConfiguration().withRecordElement(recordElement));

    }



    /**

     * Sets a JAXB annotated class that can be populated using a record of the provided XML file.

     * This will be used when unmarshalling record objects from the XML file. This is a required

     * parameter.

     */

    public Read<T> withRecordClass(Class<T> recordClass) {

      return withConfiguration(getConfiguration().withRecordClass(recordClass));

    }



    /**

     * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please

     * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional

     * parameter.

     */

    public Read<T> withMinBundleSize(long minBundleSize) {

      return toBuilder().setMinBundleSize(minBundleSize).build();

    }



    /** @deprecated use {@link #withCompression}. */

    @Deprecated

    public Read<T> withCompressionType(CompressionType compressionType) {

      return withCompression(compressionType.canonical);

    }



    /** Decompresses all input files using the specified compression type. */

    public Read<T> withCompression(Compression compression) {

      return toBuilder().setCompression(compression).build();

    }



    /** Sets the XML file charset. */

    public Read<T> withCharset(Charset charset) {

      return withConfiguration(getConfiguration().withCharset(charset));

    }



    /**

     * Sets the {@link ValidationEventHandler} to use with JAXB. Calling this with a {@code null}

     * parameter will cause the JAXB unmarshaller event handler to be unspecified.

     */

    public Read<T> withValidationEventHandler(ValidationEventHandler validationEventHandler) {

      return withConfiguration(

          getConfiguration().withValidationEventHandler(validationEventHandler));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      builder

          .addIfNotDefault(

              DisplayData.item("minBundleSize", getMinBundleSize())

                  .withLabel("Minimum Bundle Size"),

              1L)

          .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern"))

          .include("configuration", getConfiguration());

    }



    @VisibleForTesting

	BoundedSource<T> createSource() {

      return CompressedSource.from(new XmlSource<>(getFileOrPatternSpec(), getConfiguration(), 1L))

          .withCompression(getCompression());

    }



    @Override

    public PCollection<T> expand(PBegin input) {

      getConfiguration().validate();

      return input.apply(com.bff.gaia.unified.sdk.io.Read.from(createSource()));

    }

  }



  /** Implementation of {@link #readFiles}. */

  @AutoValue

  public abstract static class ReadFiles<T>

      extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {

    abstract MappingConfiguration<T> getConfiguration();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setConfiguration(MappingConfiguration<T> configuration);



      abstract ReadFiles<T> build();

    }



    private ReadFiles<T> withConfiguration(MappingConfiguration<T> configuration) {

      return toBuilder().setConfiguration(configuration).build();

    }



    /** Like {@link Read#withRootElement}. */

    public ReadFiles<T> withRootElement(String rootElement) {

      return withConfiguration(getConfiguration().withRootElement(rootElement));

    }



    /** Like {@link Read#withRecordElement}. */

    public ReadFiles<T> withRecordElement(String recordElement) {

      return withConfiguration(getConfiguration().withRecordElement(recordElement));

    }



    /** Like {@link Read#withRecordClass}. */

    public ReadFiles<T> withRecordClass(Class<T> recordClass) {

      return withConfiguration(getConfiguration().withRecordClass(recordClass));

    }



    /** Like {@link Read#withCharset}. */

    public ReadFiles<T> withCharset(Charset charset) {

      return withConfiguration(getConfiguration().withCharset(charset));

    }



    /** Like {@link Read#withValidationEventHandler}. */

    public ReadFiles<T> withValidationEventHandler(ValidationEventHandler validationEventHandler) {

      return withConfiguration(

          getConfiguration().withValidationEventHandler(validationEventHandler));

    }



    @Override

    public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {

      return input.apply(

          new ReadAllViaFileBasedSource<>(

              64 * 1024L * 1024L,

              new CreateSourceFn<>(getConfiguration()),

              JAXBCoder.of(getConfiguration().getRecordClass())));

    }

  }



  private static class CreateSourceFn<T> implements SerializableFunction<String, XmlSource<T>> {

    private final MappingConfiguration<T> configuration;



    CreateSourceFn(MappingConfiguration<T> configuration) {

      this.configuration = configuration;

    }



    @Override

    public XmlSource<T> apply(String input) {

      return new XmlSource<>(ValueProvider.StaticValueProvider.of(input), configuration, 1L);

    }

  }



  /** Implementation of {@link #write}. */

  @AutoValue

  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {

    @Nullable

    abstract String getFilenamePrefix();



    @Nullable

    abstract Class<T> getRecordClass();



    @Nullable

    abstract String getRootElement();



    @Nullable

    abstract String getCharset();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setFilenamePrefix(String prefix);



      abstract Builder<T> setRecordClass(Class<T> recordClass);



      abstract Builder<T> setRootElement(String rootElement);



      abstract Builder<T> setCharset(String charset);



      abstract Write<T> build();

    }



    /**

     * Writes to files with the given path prefix.

     *

     * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is

     * the number of output bundles.

     */

    public Write<T> to(String filenamePrefix) {

      return toBuilder().setFilenamePrefix(filenamePrefix).build();

    }



    /**

     * Writes objects of the given class mapped to XML elements using JAXB.

     *

     * <p>The specified class must be able to be used to create a JAXB context.

     */

    public Write<T> withRecordClass(Class<T> recordClass) {

      return toBuilder().setRecordClass(recordClass).build();

    }



    /** Sets the enclosing root element for the generated XML files. */

    public Write<T> withRootElement(String rootElement) {

      return toBuilder().setRootElement(rootElement).build();

    }



    /** Sets the charset used to write the file. */

    public Write<T> withCharset(Charset charset) {

      return toBuilder().setCharset(charset.name()).build();

    }



    @Override

    public PDone expand(PCollection<T> input) {

      checkArgument(getRecordClass() != null, "withRecordClass() is required");

      checkArgument(getRootElement() != null, "withRootElement() is required");

      checkArgument(getFilenamePrefix() != null, "to() is required");

      checkArgument(getCharset() != null, "withCharset() is required");

      try {

        JAXBContext.newInstance(getRecordClass());

      } catch (JAXBException e) {

        throw new RuntimeException("Error binding classes to a JAXB Context.", e);

      }



      ResourceId prefix =

          FileSystems.matchNewResource(getFilenamePrefix(), false /* isDirectory */);

      input.apply(

          FileIO.<T>write()

              .via(

                  sink(getRecordClass())

                      .withCharset(Charset.forName(getCharset()))

                      .withRootElement(getRootElement()))

              .to(prefix.getCurrentDirectory().toString())

              .withPrefix(prefix.getFilename())

              .withSuffix(".xml")

              .withIgnoreWindowing());

      return PDone.in(input.getPipeline());

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      builder

          .addIfNotNull(

              DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element"))

          .addIfNotNull(

              DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"))

          .addIfNotNull(DisplayData.item("charset", getCharset()).withLabel("Charset"));

    }

  }



  // CHECKSTYLE.OFF: JavadocStyle

  /**

   * Outputs records as XML-formatted elements using JAXB.

   *

   * <p>The produced file consists of a single root element containing 1 sub-element per element

   * written to the sink.

   *

   * <p>The given class will be used in the marshalling of records in an input PCollection to their

   * XML representation and must be able to be bound using JAXB annotations.

   *

   * <p>For example, consider the following class with JAXB annotations:

   *

   * <pre>

   *  {@literal @}XmlRootElement(name = "word_count_result")

   *  {@literal @}XmlType(propOrder = {"word", "frequency"})

   *  public class WordFrequency {

   *    public String word;

   *    public long frequency;

   *  }

   * </pre>

   *

   * <p>The following will produce XML output with a root element named "words" from a PCollection

   * of WordFrequency objects:

   *

   * <pre>{@code

   * p.apply(FileIO.<WordFrequency>write()

   *     .via(XmlIO.sink(WordFrequency.class).withRootElement("words"))

   *     .to(prefixAndShardTemplate("...", DEFAULT_UNWINDOWED_SHARD_TEMPLATE + ".xml"));

   * }</pre>

   *

   * <p>The output will look like:

   *

   * <pre>{@code

   * <words>

   *  <word_count_result>

   *    <word>decreased</word>

   *    <frequency>1</frequency>

   *  </word_count_result>

   *  <word_count_result>

   *    <word>War</word>

   *    <frequency>4</frequency>

   *  </word_count_result>

   *  <word_count_result>

   *    <word>empress'</word>

   *    <frequency>14</frequency>

   *  </word_count_result>

   *  <word_count_result>

   *    <word>stoops</word>

   *    <frequency>6</frequency>

   *  </word_count_result>

   *  ...

   * </words>

   * }</pre>

   */

  // CHECKSTYLE.ON: JavadocStyle

  public static <T> Sink<T> sink(Class<T> recordClass) {

    return new AutoValue_XmlIO_Sink.Builder<T>()

        .setRecordClass(recordClass)

        .setCharset(StandardCharsets.UTF_8.name())

        .build();

  }



  /** Implementation of {@link #sink}. */

  @AutoValue

  public abstract static class Sink<T> implements FileIO.Sink<T> {

    abstract Class<T> getRecordClass();



    @Nullable

    abstract String getRootElement();



    abstract String getCharset();



    abstract Builder<T> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setRecordClass(Class<T> clazz);



      abstract Builder<T> setRootElement(String rootElement);



      abstract Builder<T> setCharset(String charset);



      abstract Sink<T> build();

    }



    public Sink<T> withRootElement(String rootElement) {

      return toBuilder().setRootElement(rootElement).build();

    }



    public Sink<T> withCharset(Charset charset) {

      return toBuilder().setCharset(charset.name()).build();

    }



    private transient OutputStream outputStream;

    private transient Marshaller marshaller;



    @Override

    public void open(WritableByteChannel channel) throws IOException {

      checkArgument(getRootElement() != null, ".withRootElement() is required");

      try {

        marshaller = JAXBContext.newInstance(getRecordClass()).createMarshaller();

        marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);

        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);

        marshaller.setProperty(Marshaller.JAXB_ENCODING, getCharset());

      } catch (JAXBException e) {

        throw new IOException(e);

      }



      this.outputStream = Channels.newOutputStream(channel);

      outputStream.write(("<" + getRootElement() + ">\n").getBytes(Charset.forName(getCharset())));

    }



    @Override

    public void write(T element) throws IOException {

      try {

        this.marshaller.marshal(element, outputStream);

      } catch (JAXBException e) {

        throw new IOException(e);

      }

    }



    @Override

    public void flush() throws IOException {

      outputStream.write(("\n</" + getRootElement() + ">").getBytes(Charset.forName(getCharset())));

    }

  }

}